Conversation
Signed-off-by: Kamran Jafari <kjafarisadeg@nvidia.com>
|
/ok to test df7e912 |
Signed-off-by: Kamran Jafari <kjafarisadeg@nvidia.com>
|
/ok to test 1cf57ee |
Signed-off-by: Kamran Jafari <kjafarisadeg@nvidia.com>
…data parallelism Signed-off-by: Kamran Jafari <kjafarisadeg@nvidia.com>
…oder Signed-off-by: Kamran Jafari <kjafarisadeg@nvidia.com>
…rgonDataModule Signed-off-by: Kamran Jafari <kjafarisadeg@nvidia.com>
|
/ok to test 0aace0e |
Signed-off-by: Kamran Jafari <kjafarisadeg@nvidia.com>
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
📝 WalkthroughWalkthroughAdds CP-aware rank/size extraction and logging to the Energon datamodule, introduces webdataset-based multimodal decoding (image/video) via new handlers and ChatMLWebdataset, updates task encoder/sample types, adds extensive CP and encoder unit/functional tests, a VLM example script, recipe dataset_type plumbing, and a new provider flag. Changes
Sequence Diagram(s)sequenceDiagram
participant Shards as WebDataset Shards
participant Factory as ChatMLWebdataset
participant Decoder as DefaultDecoderWebdatasetFactory
participant ImgH as imagehandler
participant VidH as videohandler
participant Sample as ChatMLSample
Shards->>Factory: request sample
Factory->>Decoder: construct/auto_decode pipeline
Decoder->>ImgH: register image handler
Decoder->>VidH: register video handler
Factory->>Decoder: decode entries (images/videos)
Decoder->>ImgH: decode image bytes -> torch.Tensor
Decoder->>VidH: decode video -> frames
VidH->>ImgH: delegate frame decoding
ImgH->>Sample: populate imgs / frames tensors
Decoder->>Sample: populate videos list
Sample-->>Shards: return populated ChatMLSample
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested labels
Suggested reviewers
🚥 Pre-merge checks | ✅ 3 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (5)
tests/functional_tests/data/energon/test_base_energon_datamodule.py (1)
177-183: Consider moving mock-only CP tests totests/unit_tests/.
TestEnergonDataModuleCPHandlingandTestEnergonDataShardingVerificationare fully mocked (no real distributed initialization) and test functions in isolation. The coding guidelines place unit tests intests/unit_tests/and reservetests/functional_tests/for integration tests requiring process isolation or larger artifacts. Co-locating is understandable since the existing functional test is here, but the distinction helps with CI filtering and test execution time.As per coding guidelines: "Write unit tests using pytest for functions in isolation, stored at tests/unit_tests" and "Place functional tests in 'tests/functional_tests/'"
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/functional_tests/data/energon/test_base_energon_datamodule.py` around lines 177 - 183, These tests are pure unit tests but live in tests/functional_tests; move the mocked-only test classes TestEnergonDataModuleCPHandling and TestEnergonDataShardingVerification into tests/unit_tests/, update any imports or test fixtures they rely on, and ensure their pytest markers (if any) still apply; keep the test code unchanged other than adjusting module/package imports and test path so CI treats them as unit tests.src/megatron/bridge/data/energon/base_energon_datamodule.py (1)
187-194:cp_sizeandcp_rankare fetched solely for logging — consider documenting this intent.These variables serve a pure observability purpose, which is useful for debugging distributed setups. A brief inline comment (e.g.,
# Logged for debugging; not used in WorkerConfig) would prevent future contributors from wondering if they were accidentally omitted fromWorkerConfig.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/megatron/bridge/data/energon/base_energon_datamodule.py` around lines 187 - 194, cp_size and cp_rank are only retrieved for observability but not used elsewhere; add a short inline comment next to the calls to parallel_state.get_context_parallel_world_size() and parallel_state.get_context_parallel_rank() (and/or above the logger.info block) stating that these variables are logged for debugging/observability and intentionally not included in WorkerConfig (e.g., "# Logged for debugging; not used in WorkerConfig") so future readers know their purpose.src/megatron/bridge/recipes/qwen_vl/data/energon/task_encoder.py (3)
140-154:_resolve_hf_mm_token_ids— catching bareExceptionis overly broad.Line 149 catches all exceptions, which could mask unexpected errors (e.g.,
AttributeErrorifconvert_tokens_to_idsis misconfigured). Consider narrowing to(KeyError, ValueError, TypeError)to only catch expected failure modes.Proposed fix
try: return int(hf_tokenizer.convert_tokens_to_ids(token_str)) - except Exception: + except (KeyError, ValueError, TypeError): return default_id🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/megatron/bridge/recipes/qwen_vl/data/energon/task_encoder.py` around lines 140 - 154, _in _resolve_hf_mm_token_ids, tighten the except clause in helper _get so we don't swallow unexpected errors; replace the broad "except Exception" around hf_tokenizer.convert_tokens_to_ids(token_str) with a narrower catch like "except (KeyError, ValueError, TypeError)" so only anticipated token-conversion failures fall back to the default_id while other bugs (e.g., AttributeError or programming errors) still surface; update both calls that compute image_id and video_id via _get accordingly.
23-23: Use built-in generics and|union syntax per Python 3.10+ guidelines.The coding guidelines require
list,dictinstead ofList,Dictfromtyping, andT | Noneinstead ofOptional[T].Proposed fix
-from typing import Dict, List, Optional +from collections.abc import SequenceThen update usages on the changed lines:
- imgs: Optional[List[torch.Tensor]] = None - videos: Optional[List[torch.Tensor]] = None + imgs: list[torch.Tensor] | None = None + videos: list[torch.Tensor] | None = NoneAs per coding guidelines: "Use 'T | None' for nullable types instead of 'Optional[T]'" and "Use built-in generics (list, dict, tuple) instead of typing equivalents".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/megatron/bridge/recipes/qwen_vl/data/energon/task_encoder.py` at line 23, Replace typing generics with Python 3.10+ built-ins and use | None for nullable types: remove imports of Dict, List, Optional and update all occurrences of Dict[...] -> dict[...], List[...] -> list[...], and Optional[T] -> T | None in this module (referencing the symbols Dict, List, Optional in the import and any function signatures/annotations inside task_encoder.py such as encode_task or other typed variables). Ensure all type hints compile under Python 3.10+ and adjust any forward references if needed.
166-185: Class namevideohandlershould beVideoHandler(PascalCase).The coding guidelines require PascalCase for class names. Also, the docstring has a grammar error: "Create an video handler" → "Create a video handler."
Proposed fix
-class videohandler: - """Create an video handler.""" +class VideoHandler: + """Create a video handler."""Note: This rename would also need to be updated in
ChatMLWebdataset.__init__(line 203), the test file import (line 36), and test usage (line 120).As per coding guidelines: "Use PascalCase for class names".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/megatron/bridge/recipes/qwen_vl/data/energon/task_encoder.py` around lines 166 - 185, Rename the class videohandler to PascalCase VideoHandler and fix its docstring to "Create a video handler"; update all references to the class (e.g., constructor usages and imports such as ChatMLWebdataset.__init__, any test imports and test usage that reference videohandler) so they refer to VideoHandler to avoid breakage, ensuring the class name change is consistent across the module, initializations, and tests.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/megatron/bridge/recipes/qwen_vl/data/energon/task_encoder.py`:
- Around line 174-179: The code calls pickle.loads(data) inside __call__ with
data from webdataset shards, which is unsafe for untrusted inputs; update the
function to either (A) document the trust assumption clearly (add a comment
above __call__ stating that data must come from trusted/curated sources), or (B)
replace the deserialization with a safer format/loader (e.g., JSON/msgpack or a
vetted safe_unpickle helper) and handle errors with a try/except to avoid
crashing on malformed data; reference the __call__ method and the extension
check (self.extensions) when making the change so the logic and error handling
remain consistent.
- Around line 188-205: Docstring claims custom handlers for image, audio, and
video, but ChatMLWebdataset.__init__ only registers imagehandler and
videohandler on self._decoder; update the implementation or docstring: either
register the missing audio handler (e.g., add audiohandler(self.audio_decode) to
the Decoder list) and ensure an appropriate audio_decode method exists, or
change the class docstring to remove "audio" so it accurately reflects handlers
currently registered (imagehandler and videohandler).
- Line 16: Remove the unused import causing the linter failure: delete the
top-level "import io" statement in
src/megatron/bridge/recipes/qwen_vl/data/energon/task_encoder.py so no unused
modules remain; ensure no other references to "io" exist in functions or classes
within that module (e.g., TaskEncoder or any helper functions) before
committing.
---
Nitpick comments:
In `@src/megatron/bridge/data/energon/base_energon_datamodule.py`:
- Around line 187-194: cp_size and cp_rank are only retrieved for observability
but not used elsewhere; add a short inline comment next to the calls to
parallel_state.get_context_parallel_world_size() and
parallel_state.get_context_parallel_rank() (and/or above the logger.info block)
stating that these variables are logged for debugging/observability and
intentionally not included in WorkerConfig (e.g., "# Logged for debugging; not
used in WorkerConfig") so future readers know their purpose.
In `@src/megatron/bridge/recipes/qwen_vl/data/energon/task_encoder.py`:
- Around line 140-154: _in _resolve_hf_mm_token_ids, tighten the except clause
in helper _get so we don't swallow unexpected errors; replace the broad "except
Exception" around hf_tokenizer.convert_tokens_to_ids(token_str) with a narrower
catch like "except (KeyError, ValueError, TypeError)" so only anticipated
token-conversion failures fall back to the default_id while other bugs (e.g.,
AttributeError or programming errors) still surface; update both calls that
compute image_id and video_id via _get accordingly.
- Line 23: Replace typing generics with Python 3.10+ built-ins and use | None
for nullable types: remove imports of Dict, List, Optional and update all
occurrences of Dict[...] -> dict[...], List[...] -> list[...], and Optional[T]
-> T | None in this module (referencing the symbols Dict, List, Optional in the
import and any function signatures/annotations inside task_encoder.py such as
encode_task or other typed variables). Ensure all type hints compile under
Python 3.10+ and adjust any forward references if needed.
- Around line 166-185: Rename the class videohandler to PascalCase VideoHandler
and fix its docstring to "Create a video handler"; update all references to the
class (e.g., constructor usages and imports such as ChatMLWebdataset.__init__,
any test imports and test usage that reference videohandler) so they refer to
VideoHandler to avoid breakage, ensuring the class name change is consistent
across the module, initializations, and tests.
In `@tests/functional_tests/data/energon/test_base_energon_datamodule.py`:
- Around line 177-183: These tests are pure unit tests but live in
tests/functional_tests; move the mocked-only test classes
TestEnergonDataModuleCPHandling and TestEnergonDataShardingVerification into
tests/unit_tests/, update any imports or test fixtures they rely on, and ensure
their pytest markers (if any) still apply; keep the test code unchanged other
than adjusting module/package imports and test path so CI treats them as unit
tests.
src/megatron/bridge/recipes/qwen_vl/data/energon/task_encoder.py
Outdated
Show resolved
Hide resolved
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> Signed-off-by: Kamran Jafari <kjafarisadeg@nvidia.com>
|
/ok to test 1477d91 |
…ists of tensors Signed-off-by: Kamran Jafari <kjafarisadeg@nvidia.com>
…onfigurations Signed-off-by: Kamran Jafari <kjafarisadeg@nvidia.com>
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 6
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/megatron/bridge/data/energon/energon_provider.py (1)
36-49:⚠️ Potential issue | 🟠 Major
pack_sequences_in_batchis declared but never forwarded — the flag is a silent no-op.
pack_sequences_in_batchis added toEnergonProviderbut is never passed toEnergonMultiModalDataModuleinbuild_datasets. Any caller that setspack_sequences_in_batch=Truewill silently receive an unpacked dataloader, making the feature completely non-functional as shipped.Either wire it through:
🛠 Proposed fix
dataset = EnergonMultiModalDataModule( path=self.path, tokenizer=context.tokenizer if context.tokenizer is not None else self.tokenizer, image_processor=self.image_processor, seq_length=self.seq_length, task_encoder=self.task_encoder, micro_batch_size=self.micro_batch_size, global_batch_size=self.global_batch_size, num_workers=self.num_workers, + pack_sequences_in_batch=self.pack_sequences_in_batch, )…or, if
EnergonMultiModalDataModuledoesn't yet support this parameter, raiseNotImplementedErrorwhenpack_sequences_in_batch=Trueto fail fast rather than silently.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/megatron/bridge/data/energon/energon_provider.py` around lines 36 - 49, The pack_sequences_in_batch flag on EnergonProvider is never forwarded in build_datasets, so setting pack_sequences_in_batch=True is a no-op; update build_datasets to pass self.pack_sequences_in_batch into the EnergonMultiModalDataModule constructor (or, if EnergonMultiModalDataModule does not accept this parameter yet, detect self.pack_sequences_in_batch and raise NotImplementedError to fail fast). Locate the flag on the EnergonProvider class, the build_datasets method, and the EnergonMultiModalDataModule constructor to wire the parameter or add the explicit error handling.
♻️ Duplicate comments (1)
src/megatron/bridge/recipes/qwen_vl/data/energon/task_encoder.py (1)
173-178:pickle.loadsdeserialization risk — already flagged in a previous review.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/megatron/bridge/recipes/qwen_vl/data/energon/task_encoder.py` around lines 173 - 178, In __call__, do not use pickle.loads directly because of unsafe deserialization; instead either decode JSON (e.g., json.loads(data.decode())) if the payload is JSON, or, if you must unpickle, replace pickle.loads(data) with a safe unpickler: implement a RestrictedUnpickler that overrides find_class and only allows a whitelist of expected classes/modules, then call RestrictedUnpickler(io.BytesIO(data)).load(); update the variable 'data' assignment in __call__ to use that safe method.
🧹 Nitpick comments (1)
src/megatron/bridge/recipes/qwen_vl/data/energon/task_encoder.py (1)
22-22: Use built-in generics and|union syntax instead oftypingequivalents.Line 22 imports
Dict,List, andOptional; lines 161–162 then useOptional[List[...]]in the newChatMLSamplefields. Per coding guidelines, uselist/dictbuilt-in generics andT | Nonefor nullable types.♻️ Proposed fix
-from typing import Dict, List, Optional +from typing import Dict, List # kept only for pre-existing annotations not in changed linesFor the new fields specifically:
- imgs: Optional[List[torch.Tensor]] = None - videos: Optional[List[List[torch.Tensor]]] = None + imgs: list[torch.Tensor] | None = None + videos: list[list[torch.Tensor]] | None = NoneAs per coding guidelines: "Use built-in generics (list, dict, tuple) instead of typing equivalents" and "Use 'T | None' for nullable types instead of 'Optional[T]'".
Also applies to: 161-162
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/megatron/bridge/recipes/qwen_vl/data/energon/task_encoder.py` at line 22, Replace typing-based generics and Optional usage with built-in generics and PEP 604 union syntax: remove Dict, List, Optional imports and update annotations in the ChatMLSample fields (the new fields around the ChatMLSample class at lines ~161–162) to use dict[...] and list[...] and use T | None instead of Optional[T]; ensure any other occurrences in this module follow the same change (e.g., function signatures or field types referencing Dict/List/Optional) and run type checks to confirm no remaining typing aliases are used.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@examples/models/vlm/qwen3_vl/energon_test.sh`:
- Line 2: Update the copyright header string at the top of energon_test.sh which
currently reads "Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved."
to use the correct year 2026; locate the header line in the file and change the
year token from 2025 to 2026 so the file reflects the PR creation year.
- Around line 1-17: The script lacks strict shell options; insert "set -euo
pipefail" immediately after the shebang in energon_test.sh to ensure the script
exits on errors, treats unset variables as failures, and propagates pipe
failures; update startup of the script (around the top where WORKSPACE is
defined) so the shell options are set before any variable expansion or commands
execute.
- Line 49: Replace the direct Python invocation with the project's wrapper:
change the command that currently starts with "python -m torch.distributed.run
--nproc_per_node=$N_PROC scripts/training/run_recipe.py" to use "uv run" (e.g.,
"uv run python -m torch.distributed.run --nproc_per_node=$N_PROC
scripts/training/run_recipe.py") so the script invocation for
torch.distributed.run / scripts/training/run_recipe.py follows the coding
guideline requiring uv run in shell/example scripts.
- Line 48: The echo currently prints DP=$N_PROC which is wrong because N_PROC is
total processes; compute actual data-parallel degree DP as DP=$(( N_PROC / (EP *
TP * PP * CP) )) (or set DP earlier when you compute it) and update the echo
line to print DP (and/or include the formula) instead of N_PROC; refer to the
variables N_PROC, EP, TP, PP, CP and the echo statement that currently contains
"DP=$N_PROC" to locate and fix the line.
In `@src/megatron/bridge/recipes/qwen_vl/data/energon/task_encoder.py`:
- Around line 165-166: Rename the class `videohandler` to `VideoHandler` and
update its docstring from "Create an video handler." to "Create a video
handler."; then update all references/imports that use `videohandler` (e.g.,
test imports like `from ... import videohandler`) and any `dataset.yaml` or
config entries that reference the class by name so they use `VideoHandler`
instead to keep naming consistent with the coding guidelines.
- Line 179: The code checks extension.lower() for membership but then uses the
original-cased variable `extension` to index `self.extensions_mapping`, which
causes KeyError for mixed/upper-case inputs; update the code to normalize the
extension before lookup (e.g., compute a lowercased `ext_lower =
extension.lower()` or reassign `extension = extension.lower()`), use that
normalized value for both the guard and when computing `key =
self.extensions_mapping[...]`, and ensure any subsequent usage in the same scope
uses the normalized name so lookups against `self.extensions_mapping` succeed.
---
Outside diff comments:
In `@src/megatron/bridge/data/energon/energon_provider.py`:
- Around line 36-49: The pack_sequences_in_batch flag on EnergonProvider is
never forwarded in build_datasets, so setting pack_sequences_in_batch=True is a
no-op; update build_datasets to pass self.pack_sequences_in_batch into the
EnergonMultiModalDataModule constructor (or, if EnergonMultiModalDataModule does
not accept this parameter yet, detect self.pack_sequences_in_batch and raise
NotImplementedError to fail fast). Locate the flag on the EnergonProvider class,
the build_datasets method, and the EnergonMultiModalDataModule constructor to
wire the parameter or add the explicit error handling.
---
Duplicate comments:
In `@src/megatron/bridge/recipes/qwen_vl/data/energon/task_encoder.py`:
- Around line 173-178: In __call__, do not use pickle.loads directly because of
unsafe deserialization; instead either decode JSON (e.g.,
json.loads(data.decode())) if the payload is JSON, or, if you must unpickle,
replace pickle.loads(data) with a safe unpickler: implement a
RestrictedUnpickler that overrides find_class and only allows a whitelist of
expected classes/modules, then call
RestrictedUnpickler(io.BytesIO(data)).load(); update the variable 'data'
assignment in __call__ to use that safe method.
---
Nitpick comments:
In `@src/megatron/bridge/recipes/qwen_vl/data/energon/task_encoder.py`:
- Line 22: Replace typing-based generics and Optional usage with built-in
generics and PEP 604 union syntax: remove Dict, List, Optional imports and
update annotations in the ChatMLSample fields (the new fields around the
ChatMLSample class at lines ~161–162) to use dict[...] and list[...] and use T |
None instead of Optional[T]; ensure any other occurrences in this module follow
the same change (e.g., function signatures or field types referencing
Dict/List/Optional) and run type checks to confirm no remaining typing aliases
are used.
ℹ️ Review info
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
examples/models/vlm/qwen3_vl/energon_test.shscripts/training/run_recipe.pysrc/megatron/bridge/data/energon/energon_provider.pysrc/megatron/bridge/recipes/qwen_vl/data/energon/task_encoder.py
src/megatron/bridge/recipes/qwen_vl/data/energon/task_encoder.py
Outdated
Show resolved
Hide resolved
…ning Signed-off-by: Kamran Jafari <kjafarisadeg@nvidia.com>
Signed-off-by: Kamran Jafari <kjafarisadeg@nvidia.com>
… improve conversation handling Signed-off-by: Kamran Jafari <kjafarisadeg@nvidia.com>
|
/ok to test d42610f |
Signed-off-by: Kamran Jafari <kjafarisadeg@nvidia.com>
…tron-Bridge into kamran/qwen3_vl_energon Signed-off-by: Kamran Jafari <kjafarisadeg@nvidia.com>
…ergon modules Signed-off-by: Kamran Jafari <kjafarisadeg@nvidia.com>
|
/ok to test 853dc57 |
…skEncoder Signed-off-by: Kamran Jafari <kjafarisadeg@nvidia.com>
|
/ok to test 5047bd6 |
…VLTaskEncoder Signed-off-by: Kamran Jafari <kjafarisadeg@nvidia.com>
|
/ok to test 9390e19 |
What does this PR do ?
New Features:
Finetuning runs with an example Energon dataset, showing parity for different CP sizes and Seq. packing configurations:
Changelog
GitHub Actions CI
See the CI sectionin the Contributing doc for how to trigger the CI. A Nvidia developer will need to approve and trigger the CI for external contributors.
Before your PR is "Ready for review"
Pre checks:
If you haven't finished some of the above items you can still open "Draft" PR.
Additional Information
Summary by CodeRabbit
New Features
Improvements
Tests